MapReduce执行流程、切片源码分析

Posted by Jackson on 2017-08-27

MapReduce源码分析:

建议:写一个简单的MapReduce程序,使用Debug进行跟着源码一步一步的查看

入口点boolean flag = job.waitForCompletion(true);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public boolean waitForCompletion {
主要调用了submit方法
submit(){
setUseNewAPI() // 设置新老API的兼容
connect(); // new Cluster() 设置集群连接信息
public JobStatus run() {
return submitter.submitJobInternal(Job.this, cluster){
checkSpecs(job) // 检查输出路径是否存在
addMRFrameworkToDistributedCache(conf) // 添加到分布式的缓存当中
int maps = writeSplits(job, submitJobDir){
writeNewSplits(job, jobSubmitDir){
List<InputSplit> splits = input.getSplits(job){

}
}
}
}
}
}

获得切片的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public List<InputSplit> getSplits(JobContext job) throws IOException {// 生成文件并且将他们进行切片
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); // 求minSize
long maxSize = getMaxSplitSize(job); // 求MaxSize
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(blockSize, minSize, maxSize); // 计算splits的大小

long bytesRemaining = length;
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {// 设置百分之10的东西 SPLIT_SLOP = 1.1;
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts(),
blkLocations[blkIndex].getCachedHosts()));
bytesRemaining -= splitSize;
}
return splits;
}
1
2
3
protected long getFormatMinSplitSize() {// 固定值为 1
return 1;
}
1
private static final double SPLIT_SLOP = 1.1;   // 10% slop
1
2
3
public static long getMinSplitSize(JobContext job) {// 默认情况下是 1
return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L);
}
1
2
3
4
public static long getMaxSplitSize(JobContext context) { // 默认情况是Long类型的最大值
return context.getConfiguration().getLong(SPLIT_MAXSIZE,
Long.MAX_VALUE);
}
1
2
3
long blockSize = file.getBlockSize(){ //这里返回的是128M
return blocksize;
}

// 最终计算分片的大小值

1
2
3
4
long splitSize = computeSplitSize(blockSize, minSize, maxSize){
1 Long.maxValue 128M
return Math.max(minSize, Math.min(maxSize, blockSize));
}

注意:如果想要控制切片的大小,可以控制上面的 maxSize 的大小,当maxSize小于blockSize时候控制maxSize的大小可以
改变切片的大小,当maxSize的大小大于blockSize的大小的时候,切片的大小就是blockSize 的大小

切片是在代码级别对文件进行的切片,并不是完全按照blocksize大小
blocksize是指概念上对HDFS上的文件进行切分